Spark Agent
Spark Agent JARs
The latest version is 0.60.5.
Download the relevant version:
# Latest
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-latest.jar
# Specific version
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar
Latest builds:
- Spark 2.3: spark-agent
- Spark 2.4: spark-agent
- Spark 3.1: spark-agent
- Spark 3.2: spark-agent, spark-iceberg 1.2
- Spark 3.3: spark-agent, spark-iceberg 1.2
- Spark 3.4: spark-agent
- Spark 3.5: spark-agent
Configuration Options
Technical Parameters
Name | Details |
---|---|
spark.jars | URL of definity-spark-agent-X.X.jar (and optionally definity-spark-iceberg-1.2-X.X.jar ) |
spark.plugins | Add ai.definity.spark.plugin.DefinitySparkPlugin (for Spark 3.x) |
spark.extraListeners | Add ai.definity.spark.AppListener (for Spark 2.x) |
spark.definity.server | Definity server URL (e.g., https://app.definity.run ) |
spark.definity.api.token | Integration token (required for SaaS usage) |
Applicative Parameters
Name | Details |
---|---|
spark.definity.env.name | Environment name; defaults to default |
spark.definity.pipeline.name | Pipeline name; defaults to spark.app.name |
spark.definity.pipeline.pit | Point-in-Time grouping for tasks; defaults to the current time |
spark.definity.task.name | Logical task name, stable across runs; defaults to spark.app.name |
Advanced
Name | Details |
---|---|
spark.definity. | Enables or disables functionality with options: true , false , or opt-in (default: true ). For opt-in , users can toggle this in the pipeline settings page. |
spark.definity. | Used for grouping tasks in the same run. |
spark.definity. | User defined task ID to show in the UI and notifications (e.g., YARN run ID); defaults to spark.app.name . |
spark.definity. | Comma-separated tags, supports key:value format (e.g., team:team-A ). |
spark.definity. | Comma-separated list of notification recipient emails. |
spark.definity. | Interval in seconds for sending heartbeat to the server; defaults to 60 . |
spark.definity. | Number of retries for server request errors; defaults to 1 . |
spark.definity. | Comma-separated list of tables to ignore. Names can be full (e.g., db_a.table_a ) or partial (e.g., table_a ), which applies to all databases. |
spark.definity. | Regular expression to extract time partitions from file names. Defaults to ^.*?(?=/\d+/|/[^/]_=[^/]_/) . Set empty to disable. |
spark.definity. | Enables Delta instrumentation; defaults to true . Set to false to opt-out. |
spark.definity. | Maximum number of allowed inputs per query; defaults to 100 . |
spark.definity. | Enables default session for multi-concurrent SparkSession apps; defaults to true . Set to false to disable. |
spark.definity. | Maximum duration in seconds for the default session before rotation; defaults to 3600 . |
spark.definity. | Enable in flight data distribution metrics; defaults to false . |
spark.definity. | Enable debug logs; defaults to false . |
spark.definity. | Enable auto detection of tasks in Databricks multi-task workflows; defaults to false . defaults to true . |
spark.definity. | Flag to enable reporting of events. defaults to true . |
spark.definity. | Maximum number of events to report in one task. defaults to 5000 . |
spark.definity. | Threshold to decide when execution planning is too slow and trigger event. defaults to 60 . |
spark.definity. | Enables executor side plugin when definity plugin is configured; defaults to true . |
Metrics Calculation
Name | Details |
---|---|
spark.definity. | Number of threads for metrics calculation; defaults to 2 . |
spark.definity. | Timeout for metrics calculation, in seconds; defaults to 180 . |
spark.definity. | Maximum number of values for histogram distribution; defaults to 10 . |
spark.definity. | Specifies whether to extract metrics from Spark's ExecutorMetricsUpdate event; defaults to true . |
spark.definity. | Time-series metrics bucket size in seconds; defaults to 60 . |
spark.definity. | Total container memory for the driver in bytes (for client mode). |
spark.definity. | Total heap memory for the driver in bytes (for client mode). |
Custom Metrics
To report custom metrics, return two columns from your query:
definity_metric_name
(string)definity_metric_value
(numeric)
Example:
spark.sql(
"select 'my_new_metric' as definity_metric_name, 1.5 as definity_metric_value"
).collect()
Output Diversion
Useful for CI shadow runs flows
Name | Details |
---|---|
spark.definity. | Suffix to add to all output tables |
spark.definity. | Suffix to add to all output tables' database name |
spark.definity. | Base location for all the created output databases |
spark.definity. | Base location for output files. Either a full base location path, to divert all files to a single location regardless of their original location, or partial path to keep each in its own bucket but under a different base directory. e.g: - gs://my-tests-bucket , or my-tests-base-dir ) |
Skew events
Skew events are calculated in the executors and use Spark's plugins mechanism.
Name | Details |
---|---|
spark.definity. | Interval in milliseconds between consecutive polling requests from executor to driver when using the Definity plugin; defaults to 20000 . |
spark.definity. | Minimum difference in seconds between suspected skewed task duration and the average task duration in its stage; defaults to 60 . |
spark.definity. | Minimum ratio between suspected skewed task duration and the average task duration in its stage; defaults to 2 . |
spark.definity. | Sampling ratio of task rows (e.g., 0.01 equals 1% sampling); defaults to 0.01 . |
spark.definity. | Maximum number of sampled rows per task; defaults to 1000 . |
spark.definity. | Maximum number of reported keys per task; defaults to 10 . |
Examples
PySpark
spark = (
SparkSession.builder.appName("demo_pyspark")
.config("spark.jars", "definity-spark-agent-X.X.jar")
.config("spark.plugins", "ai.definity.spark.plugin.DefinitySparkPlugin")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.env.name", "dev")
.config("spark.definity.pipeline.name", "demo-pipeline")
.config("spark.definity.pipeline.pit", "2023-04-01")
.config("spark.definity.task.name", "demo-spark-task")
.enableHiveSupport()
.getOrCreate()
)
Airflow Integration
Using Jinja templating:
with DAG(
dag_id="spark_dag",
) as dag:
op1 = BashOperator(
task_id="spark_task",
bash_command="spark-submit ... \
--jars ...,definity-spark-agent-X.X.jar \
--conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=dev \
--conf spark.definity.pipeline.name='{{ dag_run.dag_id }}' \
--conf spark.definity.pipeline.pit='{{ ts }}' \
--conf spark.definity.task.name='{{ ti.task_id }}' \
...",
)